集成elastic

您所在的位置:网站首页 elasticjob springboot 集成elastic

集成elastic

2023-03-24 18:58| 来源: 网络整理| 查看: 265

前言

定时任务这一组件在工作过程中经常使用到,在单机节点上可以直接选择使用Spring自带的定时任务组件hubble-task,而这种定时任务一旦确定固化了定时触发策略,也无法动态开启关闭,所以后来有了Quartz。

Quartz是定时任务领域的一个开源项目,由JAVA开发,可以通过API调度定时任务的启停及策略,还有对JTA事务跟集群的支持等等强大功能。

但是Quartz又有它的一些缺点:

Quartz调整定时任务需要通过API的方式进行调度,本质上还是没有脱离业务系统。Quartz需要持久化数据到底层数据表,对业务系统的数据侵入较高。Quartz也并没有支持分布式的调度执行,只能做到单个任务单个执行

而elastic-job就是当当在Quartz的基础上进行了二次封装,elastic-job有两种版本:

Elastic-Job-Cloud:针对微服务的部署方式Elastic-Job-Lite:基于zookeeper作为注册中心的部署方式

这两个版本除了部署方式不一样在api上是一样的,elastic-job相对于Quartz增加了很多新特性:

支持UI页面,可以在web页面上动态调整定时策略跟启停基于Zookeeper作为分布式的调度,调度跟任务解耦支持了分布式调度分片,同一个任务可以分成多片执行作业类型多种,支持Simple、DataFLow数据流、Script脚本失效转移,下线的机器的任务会重新分片执行作业分片的一致性,任务分片后不会重复执行错过执行的作业补偿 安装

安装elastic-job-lite方式,需要提前安装zookeeper,如果需要安装教程可以看这篇文章:Linux在线安装Zookeeper

elastic-job在apache的地址:elasticjob

然后就需要运行包含Elastic-Job-Lite和业务代码的jar文件。不限于jar或war的启动方式。

源码地址:elastic-job-lite

启动elastic-job-lite-console

下载2.1.4版本的源码:

https://codeload.github.com/apache/shardingsphere-elasticjob/zip/refs/tags/2.1.4

下载完成解压后有如下目录:

在这里插入图片描述

进入elastic-job-lite文件下的elastic-job-lite-console。在此目录下进行打包,打包命令

mvn clean install -Dmaven.test.skip=true

打包好了启动jar包即可,也可以直接启动源码,找到console模块下的ConsoleBootstrap类进行启动

在这里插入图片描述

启动完成后访问ip:8899,账密为:root/root

进入系统后进入注册中心配置,填写需要注册的zookeeper地址进行连接。

在这里插入图片描述

下面是在linux中安装配置。也可以直接将打好的包放到linux中执行。

安装控制台

在elastic-job 3.0后没有了console模块,有了更加优美的ui控制台

这里直接下载控制台包:

https://archive.apache.org/dist/shardingsphere/elasticjob-ui-3.0.0-RC1/apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz

下载后上传到服务器进行解压

tar -zxvf apache-shardingsphere-elasticjob-3.0.0-RC1-lite-ui-bin.tar.gz

到bin文件里进行启动

./start.sh

启动成功后访问ip地址:8088,默认账号密码为root/root

在这里插入图片描述

进入后需要在全局配置-注册中心配置添加注册中心

在这里插入图片描述

新增完成后建立连接,elastic-job初步就搭建好了,如果想要引入数据源需要修改conf文件下的application.properties配置文件

我想要修改为mysql作为数据库,需要再lib文件中加入连接包,这个手动上传即可。

vim application.properties

修改为mysql的驱动跟连接方式

在这里插入图片描述

保存文件然后重新启动elastic-job,在事件追踪数据源配置中添加数据源,如下图:

在这里插入图片描述

点击建立连接,后面定时任务的配置及日志会记录在表里

在这里插入图片描述

集成 简单集成

引入pom依赖

com.cxytiandi elastic-job-spring-boot-starter 1.0.0 org.apache.curator curator-framework 2.10.0 org.apache.curator curator-recipes 2.10.0

在配置文件application.properties添加配置:

elasticJob.zk.serverLists=localhost:2181 elasticJob.zk.namespace=user-sync

然后就可以直接定义一个job类来验证了,代码如下:

@Component @ElasticJobConf(name = "TestJob",cron="0 0 0 * * ?",shardingTotalCount=5) // 每天零点执行 public class TestJob extends SimpleJob{ @Override public void execute(ShardingContext shardingContext) { // 要执行的逻辑 } }

这种方式的定时策略是依赖在ElasticJobConf注解上,调整注解的配置即可。

常规集成

常规集成有三个类

ElasticJobConfig:elastic-job组件的配置,举例zookeeper配置中心ElasticJobHandler:job任务的具体执行类可以配置在这里ElasticJobListener:job任务的监听,开始跟结束ElasticJobProperties:从配置文件读取zookeeper配置

引入依赖

com.dangdang elastic-job-common-core ${elasticjob.version} guava com.google.guava curator-framework org.apache.curator com.dangdang elastic-job-lite-core ${elasticjob.version} com.dangdang elastic-job-lite-spring ${elasticjob.version} org.apache.curator curator-recipes 2.10.0 ElasticJobConfig import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperConfiguration; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; @Component @ConditionalOnProperty(name = "elasticjob.enabled", havingValue = "true") public class ElasticJobConfig { private final ElasticJobProperties jobProperties; public ElasticJobConfig(ElasticJobProperties jobProperties) { this.jobProperties = jobProperties; } @Bean(initMethod = "init") public ZookeeperRegistryCenter regCenter() { return new ZookeeperRegistryCenter(new ZookeeperConfiguration(jobProperties.getServerLists(), jobProperties.getNamespace())); } @Bean public ElasticJobListener elasticJobListener() { return new ElasticJobListener(100, 100); } } ElasticJobHandler import com.dangdang.ddframe.job.api.simple.SimpleJob; import com.dangdang.ddframe.job.config.JobCoreConfiguration; import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration; import com.dangdang.ddframe.job.lite.api.JobScheduler; import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration; import com.dangdang.ddframe.job.lite.spring.api.SpringJobScheduler; import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Slf4j @Configuration @AutoConfigureAfter(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class) @ConditionalOnBean(com.kyf.pe.trade.dataprocess.assist.config.ElasticJobConfig.class) public class ElasticJobHandlerConfig { private final ZookeeperRegistryCenter zookeeperRegistryCenter; public ElasticJobHandlerConfig(ZookeeperRegistryCenter zookeeperRegistryCenter) { this.zookeeperRegistryCenter = zookeeperRegistryCenter; } /** * 配置任务详细信息 * * @param jobClass 定时任务实现类 * @param cron 表达式 * @param shardingTotalCount 分片数 * @param shardingItemParameters 分片参数 * @return */ private LiteJobConfiguration getLiteJobConfiguration(final Class jobClass, final String cron, final int shardingTotalCount, final String shardingItemParameters, final String jobParameters, final String description) { // 定义作业核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(), cron, shardingTotalCount). shardingItemParameters(shardingItemParameters).jobParameter(jobParameters).description(description).build(); // 定义SIMPLE类型配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName()); // 定义Lite作业根配置 return LiteJobConfiguration.newBuilder(simpleJobConfig).build(); } /** * 具体任务 */ @Bean(initMethod = "init") public JobScheduler pushHrhbJobScheduler(final TestJob testjob, @Value("${job.test.cron}") final String cron, @Value("${job.test.shardingTotalCount}") final int shardingTotalCount, @Value("${job.test.description}") final String description) { return new SpringJobScheduler(testjob, zookeeperRegistryCenter, getLiteJobConfiguration(testjob.getClass(), cron, shardingTotalCount, "", "", description)); } } ElasticJobListener import com.dangdang.ddframe.job.executor.ShardingContexts; import com.dangdang.ddframe.job.lite.api.listener.AbstractDistributeOnceElasticJobListener; import lombok.extern.slf4j.Slf4j; @Slf4j public class ElasticJobListener extends AbstractDistributeOnceElasticJobListener { /** * 设置间隔时间 * * @param startedTimeoutMilliseconds * @param completedTimeoutMilliseconds */ public ElasticJobListener(long startedTimeoutMilliseconds, long completedTimeoutMilliseconds) { super(startedTimeoutMilliseconds, completedTimeoutMilliseconds); } @Override public void doBeforeJobExecutedAtLastStarted(ShardingContexts shardingContexts) { log.info("任务名:{}开始", shardingContexts.getJobParameter()); } @Override public void doAfterJobExecutedAtLastCompleted(ShardingContexts shardingContexts) { log.info("任务名:{}结束", shardingContexts.getJobParameter()); } } ElasticJobProperties import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "elasticjob") public class ElasticJobProperties { private boolean enabled = true; private String serverLists; private String namespace; public boolean isEnabled() { return enabled; } public void setEnabled(boolean enabled) { this.enabled = enabled; } public String getServerLists() { return serverLists; } public void setServerLists(String serverLists) { this.serverLists = serverLists; } public String getNamespace() { return namespace; } public void setNamespace(String namespace) { this.namespace = namespace; } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3